{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## <font color='green'> Retail - Frequent Pattern Mining<font>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### <font color='green'> 1. Description<font>\n",
    "\n",
    "Frequent Pattern Mining using Retail dataset.\n",
    "Dataset can be downloaded from http://fimi.uantwerpen.be/data/retail.dat\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### <font color='green'> 2. Data Preprocessing<font>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import time\n",
    "import pandas as pd\n",
    "import numpy as np\n",
    "import random\n",
    "from collections import OrderedDict"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Encode string grocery data to numeric form\n",
    "def encode_data(data):\n",
    "    import itertools\n",
    "    unq = np.unique(list(itertools.chain.from_iterable(data)))\n",
    "    id = np.arange(1, len(unq) + 1, 1)\n",
    "    transmap = dict(zip(unq, id))\n",
    "    ret = []\n",
    "    for e in data:\n",
    "        enc = [int(transmap[i]) for i in e]\n",
    "        ret.append(enc)\n",
    "    return ret"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pyspark\n",
    "from pyspark.sql import SQLContext\n",
    "def create_spark_df(data):\n",
    "    sp_item_list = []\n",
    "    cnt = 0\n",
    "    for ilist in data:\n",
    "        sp_item_list.append((cnt, ilist))\n",
    "        cnt = cnt + 1\n",
    "    # Construct spark dataframe\n",
    "    sqlContext = SQLContext(sc)\n",
    "    sp_df = sqlContext.createDataFrame(data=sp_item_list, \\\n",
    "                                       schema=[\"id\", \"items\"])\n",
    "    return sp_df"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "def get_names(fname):\n",
    "    fp = open(fname, 'r')\n",
    "    line = fp.readline()\n",
    "    max_ncol = 0\n",
    "    while line:\n",
    "        ncol = len(line.split(\" \"))\n",
    "        if ncol > max_ncol:\n",
    "            max_ncol = ncol\n",
    "        line = fp.readline()\n",
    "    names = [\"item_\" + str(i) for i in range(max_ncol)]\n",
    "    return names"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "def preprocess_data(fname):\n",
    "    df = pd.read_csv(fname, sep = \" \",\n",
    "                     names = get_names(fname), # variable no. of fields in each line, hence col-names are provided\n",
    "                     engine = 'python')  # older pandas version has some parsing issue with c-engine\n",
    "    item_list = []\n",
    "    for ilist in df.values.tolist():\n",
    "        item = [itm for itm in ilist if str(itm) != 'nan']\n",
    "        item_list.append(item)\n",
    "    item_list = encode_data(item_list)\n",
    "    return item_list"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Dataset contains 88162 item sets\n"
     ]
    }
   ],
   "source": [
    "#---- Data Preparation ----\n",
    "# Please download the dataset from below link.\n",
    "# http://fimi.uantwerpen.be/data/retail.dat\n",
    "\n",
    "DATA_FILE = \"datasets/retail.dat\"\n",
    "item_list = preprocess_data(DATA_FILE)\n",
    "print(\"Dataset contains {} item sets\".format(len(item_list)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### <font color='green'> 3. Algorithm Evaluation<font>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "train_time = []\n",
    "test_time = []\n",
    "estimator_name = []"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "def evaluate(estimator, estimator_nm, data):\n",
    "    estimator_name.append(estimator_nm)\n",
    "    start_time = time.time()\n",
    "    model = estimator.fit(data)\n",
    "    if \"pyspark\" in estimator_nm:\n",
    "        print(\"total FIS count: %d\" % (model.freqItemsets.count())) # Count is required to actually invoke the spark operation (since it is lazy)\n",
    "    else:\n",
    "        print(\"total FIS count: %d\" % (len(model.freqItemsets)))\n",
    "    train_time.append(round(time.time() - start_time, 4))\n",
    "\n",
    "    start_time = time.time()\n",
    "    sp_rules = model.associationRules\n",
    "    if \"pyspark\" in estimator_nm:\n",
    "        print(\"total Rule count: %d\" % (sp_rules.count())) # Count is required to actually invoke the spark operation (since it is lazy)\n",
    "    else:\n",
    "        print(\"total Rule count: %d\" % (len(sp_rules)))\n",
    "    test_time.append(round(time.time() - start_time, 4))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 3.1 FPGrowth"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "total FIS count: 7589\n",
      "total Rule count: 8668\n"
     ]
    }
   ],
   "source": [
    "import pyspark\n",
    "from pyspark.ml.fpm import FPGrowth as pysparkFPGrowth\n",
    "sc = pyspark.SparkContext(appName=\"fpgrowth\")\n",
    "s_est = pysparkFPGrowth(minSupport=0.001, minConfidence=0.05)\n",
    "e_nm = \"fpgrowth_pyspark_\" + pyspark.__version__\n",
    "evaluate(s_est, e_nm, create_spark_df(item_list))\n",
    "sc.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "total FIS count: 7589\n",
      "total Rule count: 8668\n"
     ]
    }
   ],
   "source": [
    "import frovedis\n",
    "from frovedis.exrpc.server import FrovedisServer\n",
    "FrovedisServer.initialize(\"mpirun -np 8 \" +  os.environ[\"FROVEDIS_SERVER\"])\n",
    "from frovedis.mllib.fpm import FPGrowth as frovFPGrowth\n",
    "f_est = frovFPGrowth(minSupport=0.001, minConfidence=0.05, mem_opt_level = 1)\n",
    "e_nm = \"fpgrowth_frovedis_\" + frovedis.__version__\n",
    "evaluate(f_est, e_nm, item_list)\n",
    "f_est.release()\n",
    "FrovedisServer.shut_down()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### <font color='green'> 4. Performance summary<font>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>estimator</th>\n",
       "      <th>train time</th>\n",
       "      <th>test time</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>fpgrowth_pyspark_3.0.2</td>\n",
       "      <td>37.5539</td>\n",
       "      <td>22.8813</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>fpgrowth_frovedis_0.9.10</td>\n",
       "      <td>1.1794</td>\n",
       "      <td>0.4434</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "                  estimator  train time  test time\n",
       "0    fpgrowth_pyspark_3.0.2     37.5539    22.8813\n",
       "1  fpgrowth_frovedis_0.9.10      1.1794     0.4434"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "summary = pd.DataFrame(OrderedDict({ \"estimator\": estimator_name,\n",
    "                                     \"train time\": train_time,\n",
    "                                     \"test time\": test_time\n",
    "                                  }))\n",
    "summary"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Frovedis Speed-up: 31.84\n"
     ]
    }
   ],
   "source": [
    "speed_up = train_time[0] / train_time[1]\n",
    "print(\"Frovedis Speed-up: %.2f\" % (speed_up))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
